package com.bosssoft.platform.fasttcc.log.file;

import com.bosssoft.platform.fasttcc.TccTransaction;
import com.bosssoft.platform.fasttcc.Xid;
import com.bosssoft.platform.fasttcc.util.UnmapMappedBuffer;
import com.jfireframework.baseutil.StringUtil;
import com.jfireframework.baseutil.TRACEID;
import com.jfireframework.baseutil.reflect.ReflectUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ArchiveWorker
{
    Charset CHARSET = Charset.forName("utf8");

    private volatile     boolean         idle            = true;
    private              String          baseDir;
    private              DataSource      dataSource;
    private              ExecutorService executorService = Executors.newSingleThreadExecutor();
    private static final Logger          LOGGER          = LoggerFactory.getLogger(ArchiveWorker.class);

    public ArchiveWorker(String baseDir, DataSource dataSource)
    {
        this.baseDir = baseDir;
        this.dataSource = dataSource;
    }

    public boolean isIdle()
    {
        return idle;
    }

    public void start()
    {
        if (idle == false)
        {
            throw new IllegalStateException();
        }
        idle = false;
        executorService.submit(new Runnable()
        {
            @Override
            public void run()
            {
                doArchive();
                idle = true;
            }
        });
    }

    public Collection<TccTransactionInfo> doArchive()
    {
        String                         traceId      = TRACEID.currentTraceId();
        Collection<TccTransactionInfo> exists;
        File                           archivedFile = new File(baseDir + File.separator + "Archived.log");
        if (archivedFile.exists())
        {
            exists = readArchivedLogFile();
            LOGGER.debug("traceId:{} 从Archived.log文件读取到的归档TCC事务个数:{}", traceId, exists.size());
        }
        else
        {
            exists = new LinkedList<>();
            LOGGER.debug("traceId:{} 当前不存在Archived.log文件，忽略", traceId);
        }
        exists = readArchivingFile(exists);
        LOGGER.debug("traceId:{} 读取Archiving.log文件后，剩余待归档TCC事务个数:{}", traceId, exists.size());
        writeToTmpFile(exists);
        boolean delete = new File(baseDir + File.separator + "Archiving.log").delete();
        LOGGER.debug("traceId:{} 删除Archiving.log文件成功:{}", traceId, delete);
        delete = new File(baseDir + File.separator + "Archived.log").delete();
        LOGGER.debug("traceId:{} 删除Archived.log文件成功:{}", traceId, delete);
        File tmpFile = new File(baseDir + File.separator + "Archived.log.tmp");
        if (tmpFile.exists())
        {
            boolean rename = tmpFile.renameTo(new File(baseDir + File.separator + "Archived.log"));
            LOGGER.debug("traceId:{} 临时文件重名为Archived.log成功:{}", traceId, rename);
        }
        return exists;
    }

    private void writeToTmpFile(Collection<TccTransactionInfo> tccTransactionInfos)
    {
        String traceId = TRACEID.currentTraceId();
        if (tccTransactionInfos == null || tccTransactionInfos.isEmpty())
        {
            LOGGER.debug("traceId:{} 不存在执行中的事务，无需创建归档文件", traceId);
            return;
        }
        File file = new File(baseDir + File.separator + "Archived.log.tmp");
        LOGGER.debug("traceId:{} 准备将归档TCC事务写入文件", traceId);
        try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"))
        {
            for (TccTransactionInfo each : tccTransactionInfos)
            {
                randomAccessFile.write(each.getXid().getGlobalId());
                randomAccessFile.writeByte(each.getState());
                randomAccessFile.writeByte(each.getRole());
                if (each.getPropagatedByValueBytes() == null || each.getPropagatedByValueBytes().length == 0)
                {
                    randomAccessFile.writeShort(0);
                }
                else
                {
                    randomAccessFile.writeShort(each.getPropagatedByValueBytes().length);
                    randomAccessFile.write(each.getPropagatedByValueBytes());
                }
                randomAccessFile.writeByte(each.getLocalTransactionInfos().size());
                for (LocalTransactionInfo localTransactionInfo : each.getLocalTransactionInfos())
                {
                    randomAccessFile.write(localTransactionInfo.getBranchId());
                    randomAccessFile.writeByte(localTransactionInfo.getPreTxIndex());
                    randomAccessFile.writeByte(localTransactionInfo.getTxIndex());
                }
                randomAccessFile.writeByte(each.getTccInvokeInfos().size());
                for (TccInvokeInfo tccInvokeInfo : each.getTccInvokeInfos())
                {
                    randomAccessFile.writeShort(tccInvokeInfo.getSignBytes().length);
                    randomAccessFile.write(tccInvokeInfo.getSignBytes());
                    randomAccessFile.writeByte(tccInvokeInfo.getAssociativeTxIndex());
                    randomAccessFile.write(tccInvokeInfo.getCompleteStageXidBranchId());
                    randomAccessFile.writeInt(tccInvokeInfo.getParams().length);
                    randomAccessFile.write(tccInvokeInfo.getParams());
                }
                randomAccessFile.write(each.getRemoteResourceInfos().size());
                for (RemoteResourceInfo remoteResourceInfo : each.getRemoteResourceInfos())
                {
                    randomAccessFile.writeShort(remoteResourceInfo.getIdentifierBytes().length);
                    randomAccessFile.write(remoteResourceInfo.getIdentifierBytes());
                }
            }
            LOGGER.debug("traceId:{} 归档事务写入临时文件完毕", traceId);
        }
        catch (Throwable e)
        {
            LOGGER.error("traceId:{} 归档事务写入临时文件出现未知异常", traceId, e);
            ReflectUtil.throwException(e);
        }
    }

    private Collection<TccTransactionInfo> readArchivingFile(Collection<TccTransactionInfo> collection)
    {
        String traceId       = TRACEID.currentTraceId();
        File   archivingFile = new File(baseDir + File.separator + "Archiving.log");
        if (archivingFile.exists() == false)
        {
            LOGGER.debug("traceId:{} 不存在Archiving.log文件，直接返回", traceId);
            return collection;
        }
        Map<Xid, TccTransactionInfo> map = new HashMap<>();
        if (collection != null)
        {
            for (TccTransactionInfo each : collection)
            {
                map.put(each.getXid(), each);
                LOGGER.debug("traceId:{} 当前已经从Archived.log文件中读取到TCC事务:{}", traceId, each.getXid());
            }
        }
        List<TccTransactionInfo> deletes = new LinkedList<>();
        MappedByteBuffer         buffer  = null;
        LOGGER.debug("traceId:{} 准备开始读取Archiiving.log文件", traceId);
        try (RandomAccessFile accessFile = new RandomAccessFile(archivingFile, "r"); FileChannel channel = accessFile.getChannel())
        {
            buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size());
            while (buffer.hasRemaining())
            {
                byte            schema          = buffer.get();
                LogRecordSchema logRecordSchema = LogRecordSchema.valueOf(schema);
                switch (logRecordSchema)
                {
                    case CREATE_TCC_TRANSACTION:
                    {
                        byte[] globalId = new byte[8];
                        buffer.get(globalId);
                        byte   role   = buffer.get();
                        short  length = buffer.getShort();
                        byte[] value  = new byte[length];
                        buffer.get(value);
                        TccTransactionInfo transactionInfo = new TccTransactionInfo();
                        transactionInfo.setPropagatedByValueBytes(value);
                        transactionInfo.setRole(role);
                        transactionInfo.setState(TccTransaction.ACTIVE);
                        transactionInfo.setXid(createGlobalID(globalId));
                        map.put(transactionInfo.getXid(), transactionInfo);
                        LOGGER.debug("traceId:{} 读取到TCC事务创建日志，xid:{}，role:{}。", traceId, StringUtil.toHexString(globalId), role);
                        break;
                    }
                    case REGISTER_lOCAL_TRANSACTION:
                    {
                        byte[] globalId = new byte[8];
                        byte[] branchId = new byte[8];
                        byte   preTxIndex;
                        byte   txIndex;
                        buffer.get(globalId);
                        buffer.get(branchId);
                        preTxIndex = buffer.get();
                        txIndex = buffer.get();
                        Xid                xid                = createGlobalID(globalId);
                        TccTransactionInfo tccTransactionInfo = map.get(xid);
                        tccTransactionInfo.addLocalTransactionInfo(buildLocalTransactionInfo(branchId, preTxIndex, txIndex));
                        break;
                    }
                    case REGISTER_TCC_INVOKE:
                    {
                        byte[] globalId = new byte[8];
                        buffer.get(globalId);
                        Xid    xid        = createGlobalID(globalId);
                        byte[] methodSign = new byte[buffer.getShort()];
                        buffer.get(methodSign);
                        byte   associatedTxIndex     = buffer.get();
                        byte[] completeStageBranchId = new byte[8];
                        buffer.get(completeStageBranchId);
                        byte[] params = new byte[buffer.getInt()];
                        buffer.get(params);
                        TccInvokeInfo tccInvokeInfo = new TccInvokeInfo();
                        tccInvokeInfo.setAssociativeTxIndex(associatedTxIndex);
                        tccInvokeInfo.setCompleteStageXidBranchId(completeStageBranchId);
                        tccInvokeInfo.setParams(params);
                        tccInvokeInfo.setSignBytes(methodSign);
                        TccTransactionInfo tccTransactionInfo = map.get(xid);
                        tccTransactionInfo.addTccInvokeInfo(tccInvokeInfo);
                        break;
                    }
                    case REGISTER_REMOTE_RESOURCE:
                    {
                        byte[] globalId = new byte[8];
                        buffer.get(globalId);
                        byte[] identifier = new byte[buffer.getShort()];
                        buffer.get(identifier);
                        Xid                xid                = createGlobalID(globalId);
                        TccTransactionInfo tccTransactionInfo = map.get(xid);
                        RemoteResourceInfo remoteResourceInfo = new RemoteResourceInfo();
                        remoteResourceInfo.setIdentifierBytes(identifier);
                        tccTransactionInfo.addRemoteResourceInfo(remoteResourceInfo);
                        break;
                    }
                    case UPDATE_TRANSACTION_STATE:
                    {
                        byte[] globalId = new byte[8];
                        buffer.get(globalId);
                        byte state = buffer.get();
                        Xid  xid   = createGlobalID(globalId);
                        LOGGER.debug("traceId:{} 读取到事务状态变更日志，事务xid：{},变更后状态:{}", traceId, xid, state);
                        TccTransactionInfo tccTransactionInfo = map.get(xid);
                        tccTransactionInfo.setState(state);
                        if (state == TccTransaction.FINISHED)
                        {
                            map.remove(xid);
                            deletes.add(tccTransactionInfo);
                        }
                        break;
                    }
                }
            }
            deleteLogTableItem(deletes);
            return map.values();
        }
        catch (Throwable e)
        {
            ReflectUtil.throwException(e);
            return null;
        }
        finally
        {
            if (buffer != null)
            {
                UnmapMappedBuffer.unmap(buffer);
            }
        }
    }

    private void deleteLogTableItem(Collection<ArchiveWorker.TccTransactionInfo> deletes)
    {
        String traceId = TRACEID.currentTraceId();
        try (Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement("delete from fasttcc where global_id=?");)
        {
            for (TccTransactionInfo each : deletes)
            {
                String globalId = StringUtil.toHexString(each.getXid().getGlobalId());
                preparedStatement.setString(1, globalId);
                preparedStatement.addBatch();
                LOGGER.debug("traceId:{} 从日志表删除记录:{}", traceId, globalId);
            }
            preparedStatement.executeBatch();
        }
        catch (SQLException e)
        {
            LOGGER.debug("traceId:{} 日志表删除数据出现未知异常", traceId, e);
            ReflectUtil.throwException(e);
        }
    }

    private LocalTransactionInfo buildLocalTransactionInfo(byte[] branchId, byte preTxIndex, byte txIndex)
    {
        LocalTransactionInfo localTransactionInfo = new LocalTransactionInfo();
        localTransactionInfo.setBranchId(branchId);
        localTransactionInfo.setPreTxIndex(preTxIndex);
        localTransactionInfo.setTxIndex(txIndex);
        return localTransactionInfo;
    }

    private List<TccTransactionInfo> readArchivedLogFile()
    {
        String                   traceId     = TRACEID.currentTraceId();
        File                     archivedLog = new File(baseDir + File.separator + "Archived.log");
        MappedByteBuffer         buffer      = null;
        List<TccTransactionInfo> list        = new LinkedList<>();
        try (RandomAccessFile randomAccessFile = new RandomAccessFile(archivedLog, "r"); FileChannel channel = randomAccessFile.getChannel();)
        {
            LOGGER.debug("traceId:{} 准备读取Archived.log文件，位置:{}", traceId, archivedLog.getAbsolutePath());
            buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size());
            while (buffer.hasRemaining())
            {
                byte[] seq1_globalId = new byte[8];
                buffer.get(seq1_globalId);
                int    seq2_state             = buffer.get();
                byte   seq3_role              = buffer.get();
                short  seq4_length            = buffer.getShort();
                byte[] propagatedByValueBytes = new byte[seq4_length];
                buffer.get(propagatedByValueBytes);
                TccTransactionInfo tccTransactionInfo = new TccTransactionInfo();
                tccTransactionInfo.setState(seq2_state);
                tccTransactionInfo.setRole(seq3_role);
                tccTransactionInfo.setPropagatedByValueBytes(propagatedByValueBytes);
                tccTransactionInfo.setXid(createGlobalID(seq1_globalId));
                list.add(tccTransactionInfo);
                LOGGER.debug("traceId:{} 读取到TCC事务:{}", traceId, tccTransactionInfo);
                byte   seq6_arraySize  = buffer.get();
                byte[] seq7_1_branchId = new byte[8];
                byte   seq7_2_preTxIndex;
                byte   seq7_3_txIndex;
                for (byte i = 0; i < seq6_arraySize; i++)
                {
                    buffer.get(seq7_1_branchId);
                    seq7_2_preTxIndex = buffer.get();
                    seq7_3_txIndex = buffer.get();
                    LocalTransactionInfo localTransactionInfo = buildLocalTransactionInfo(seq7_1_branchId, seq7_2_preTxIndex, seq7_3_txIndex);
                    tccTransactionInfo.addLocalTransactionInfo(localTransactionInfo);
                    LOGGER.debug("traceId:{} 读取到本地事务:{}", traceId, localTransactionInfo);
                }
                int    seq8_arraySize           = buffer.get();
                int    associativeTxIndex;
                byte[] completeStageXidBranchId = new byte[8];
                for (int i = 0; i < seq8_arraySize; i++)
                {
                    byte[] methodSign = new byte[buffer.getShort()];
                    buffer.get(methodSign);
                    associativeTxIndex = buffer.get();
                    buffer.get(completeStageXidBranchId);
                    byte[] params = new byte[buffer.getInt()];
                    buffer.get(params);
                    TccInvokeInfo tccInvokeInfo = new TccInvokeInfo();
                    tccInvokeInfo.setCompleteStageXidBranchId(completeStageXidBranchId);
                    tccInvokeInfo.setSignBytes(methodSign);
                    tccInvokeInfo.setAssociativeTxIndex(associativeTxIndex);
                    tccInvokeInfo.setParams(params);
                    tccTransactionInfo.addTccInvokeInfo(tccInvokeInfo);
                    LOGGER.debug("traceId:{} 读取到TCCInvoke信息", traceId, completeStageXidBranchId, methodSign, associativeTxIndex);
                }
                int seq10_arraySize = buffer.get();
                for (int i = 0; i < seq10_arraySize; i++)
                {
                    byte[] identifier = new byte[buffer.getShort()];
                    buffer.get(identifier);
                    RemoteResourceInfo remoteResourceInfo = new RemoteResourceInfo();
                    remoteResourceInfo.setIdentifierBytes(identifier);
                    tccTransactionInfo.addRemoteResourceInfo(remoteResourceInfo);
                    LOGGER.debug("traceId:{} 读取到远端资源:{}", traceId, new String(identifier, CHARSET));
                }
            }
            return list;
        }
        catch (IOException e)
        {
            ReflectUtil.throwException(e);
            return null;
        }
        finally
        {
            if (buffer != null)
            {
                UnmapMappedBuffer.unmap(buffer);
            }
        }
    }

    private Xid createGlobalID(byte[] globalId)
    {
        XidInfo xidInfo = new XidInfo();
        xidInfo.globalId = globalId;
        return xidInfo;
    }

    class XidInfo implements Xid
    {
        private byte[] globalId;

        @Override
        public byte[] getGlobalId()
        {
            return globalId;
        }

        @Override
        public byte[] getBranchId()
        {
            return null;
        }

        @Override
        public int hashCode()
        {
            int hashcode = (((globalId[0] ^ globalId[4]) & 0xff) << 24)//
                    | (((globalId[1] ^ globalId[5]) & 0xff) << 16)//
                    | (((globalId[2] ^ globalId[6]) & 0xff) << 8)//
                    | (((globalId[3] ^ globalId[7]) & 0xff) << 0);
            return hashcode;
        }

        @Override
        public boolean equals(Object o)
        {
            if (o instanceof Xid == false)
            {
                return false;
            }
            byte[] globalId = ((Xid) o).getGlobalId();
            for (int i = 0; i < this.globalId.length; i++)
            {
                if (this.globalId[i] != globalId[i])
                {
                    return false;
                }
            }
            return true;
        }

        @Override
        public String toString()
        {
            StringBuilder s = new StringBuilder();
            s.append(StringUtil.toHexString(globalId));
            return s.toString();
        }
    }

    class TccTransactionInfo
    {
        private Xid                        xid;
        private byte                       role;
        private List<LocalTransactionInfo> localTransactionInfos = new ArrayList<>();
        private List<RemoteResourceInfo>   remoteResourceInfos   = new ArrayList<>();
        private List<TccInvokeInfo>        tccInvokeInfos        = new ArrayList<>();
        private int                        state;
        private byte[]                     propagatedByValueBytes;

        public void setState(int state)
        {
            this.state = state;
        }

        public void setRole(byte role)
        {
            this.role = role;
        }

        public void setXid(Xid xid)
        {
            this.xid = xid;
        }

        public void addLocalTransactionInfo(ArchiveWorker.LocalTransactionInfo localTransactionInfo)
        {
            localTransactionInfos.add(localTransactionInfo);
        }

        public void addTccInvokeInfo(TccInvokeInfo tccInvokeInfo)
        {
            tccInvokeInfos.add(tccInvokeInfo);
        }

        public void addRemoteResourceInfo(RemoteResourceInfo remoteResourceInfo)
        {
            remoteResourceInfos.add(remoteResourceInfo);
        }

        public Xid getXid()
        {
            return xid;
        }

        public void setPropagatedByValueBytes(byte[] propagatedByValueBytes)
        {
            this.propagatedByValueBytes = propagatedByValueBytes;
        }

        public int getState()
        {
            return state;
        }

        public int getRole()
        {
            return role;
        }

        public List<LocalTransactionInfo> getLocalTransactionInfos()
        {
            return localTransactionInfos;
        }

        public List<RemoteResourceInfo> getRemoteResourceInfos()
        {
            return remoteResourceInfos;
        }

        public List<TccInvokeInfo> getTccInvokeInfos()
        {
            return tccInvokeInfos;
        }

        public byte[] getPropagatedByValueBytes()
        {
            return propagatedByValueBytes;
        }

        @Override
        public String toString()
        {
            return "TccTransactionInfo{" + "xid=" + xid + ", role=" + role + ", state=" + state + '}';
        }
    }

    class TccInvokeInfo
    {

        private byte[] completeStageXidBranchId;
        private int    associativeTxIndex;
        private byte[] params;
        private byte[] signBytes;

        public void setCompleteStageXidBranchId(byte[] completeStageXidBranchId)
        {
            this.completeStageXidBranchId = completeStageXidBranchId;
        }

        public void setAssociativeTxIndex(int associativeTxIndex)
        {
            this.associativeTxIndex = associativeTxIndex;
        }

        public void setParams(byte[] params)
        {
            this.params = params;
        }

        public void setSignBytes(byte[] signBytes)
        {
            this.signBytes = signBytes;
        }

        public byte[] getCompleteStageXidBranchId()
        {
            return completeStageXidBranchId;
        }

        public int getAssociativeTxIndex()
        {
            return associativeTxIndex;
        }

        public byte[] getParams()
        {
            return params;
        }

        public byte[] getSignBytes()
        {
            return signBytes;
        }

        @Override
        public String toString()
        {
            return "TccInvokeInfo{" + "completeStageXidBranchId=" + StringUtil.toHexString(completeStageXidBranchId) + ", associativeTxIndex=" + associativeTxIndex + ",methodSign:" + new String(signBytes, CHARSET);
        }
    }

    class LocalTransactionInfo
    {

        private byte[] branchId;
        private int    preTxIndex;
        private int    txIndex;

        public void setBranchId(byte[] branchId)
        {
            this.branchId = branchId;
        }

        public void setPreTxIndex(int preTxIndex)
        {
            this.preTxIndex = preTxIndex;
        }

        public void setTxIndex(int txIndex)
        {
            this.txIndex = txIndex;
        }

        public byte[] getBranchId()
        {
            return branchId;
        }

        public int getPreTxIndex()
        {
            return preTxIndex;
        }

        public int getTxIndex()
        {
            return txIndex;
        }

        @Override
        public String toString()
        {
            return "LocalTransactionInfo{branchId=" + StringUtil.toHexString(branchId) + ", preTxIndex=" + preTxIndex + ", txIndex=" + txIndex + '}';
        }
    }

    class RemoteResourceInfo
    {

        private byte[] identifierBytes;

        public void setIdentifierBytes(byte[] identifierBytes)
        {
            this.identifierBytes = identifierBytes;
        }

        public byte[] getIdentifierBytes()
        {
            return identifierBytes;
        }
    }
}
